Python: Add Azure Cosmos history provider package#4271
Python: Add Azure Cosmos history provider package#4271eavanvalkenburg wants to merge 2 commits intomicrosoft:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a new Python workspace package (agent-framework-azure-cosmos) that implements an Azure Cosmos DB–backed BaseHistoryProvider for persisting conversation history, along with unit tests and a runnable sample. This supports the Agent Framework’s pluggable “context/history provider” story similarly to existing integrations (e.g., Redis).
Changes:
- Introduce
agent-framework-azure-cosmospackage withCosmosHistoryProvider(Cosmos DB transactional batch writes + session partitioning). - Add unit tests and package-local sample/README for the Cosmos history provider.
- Wire the new package into the Python workspace (pyproject + uv.lock) and apply minor formatting cleanups in existing tests/modules.
Reviewed changes
Copilot reviewed 15 out of 16 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| python/uv.lock | Adds the new workspace member and locks azure-cosmos dependency. |
| python/pyproject.toml | Registers agent-framework-azure-cosmos as a uv workspace source. |
| python/packages/core/tests/core/test_skills.py | Minor whitespace-only change. |
| python/packages/core/tests/core/test_function_invocation_logic.py | Formatting of assertion messages (single-line f-strings). |
| python/packages/core/agent_framework/_skills.py | Minor formatting/line-wrapping adjustments. |
| python/packages/azure-cosmos/pyproject.toml | New package metadata, dependencies, and tooling config. |
| python/packages/azure-cosmos/agent_framework_azure_cosmos/_history_provider.py | Implements CosmosHistoryProvider (get/save/clear/list + batching + container creation). |
| python/packages/azure-cosmos/agent_framework_azure_cosmos/init.py | Exports CosmosHistoryProvider + version discovery. |
| python/packages/azure-cosmos/tests/test_cosmos_history_provider.py | New unit test suite for the provider (mocked Cosmos client/container). |
| python/packages/azure-cosmos/samples/cosmos_history_provider.py | Runnable sample demonstrating agent usage with Cosmos-backed history. |
| python/packages/azure-cosmos/samples/init.py | Samples package marker + docstring. |
| python/packages/azure-cosmos/samples/README.md | Sample runner documentation. |
| python/packages/azure-cosmos/README.md | Package-level “getting started” documentation. |
| python/packages/azure-cosmos/LICENSE | Package license file. |
| python/packages/azure-cosmos/AGENTS.md | Package agent documentation (class + import path). |
| python/packages/azure-ai-search/tests/test_aisearch_context_provider.py | Removes redundant local imports + minor formatting. |
| query = "SELECT c.message FROM c WHERE c.session_id = @session_id ORDER BY c.sort_key ASC" | ||
| parameters: list[dict[str, object]] = [{"name": "@session_id", "value": session_key}] |
There was a problem hiding this comment.
Documents include a source_id field, but reads/writes don’t use it: get_messages() queries only by session_id. If multiple history providers (or multiple source_ids) share the same container, histories will be mixed, and clear() may delete other providers’ data. Either remove source_id from stored documents, or (preferably) include it in the query filter (and in clear()/list_sessions() queries) so each provider instance is isolated.
| query = "SELECT c.message FROM c WHERE c.session_id = @session_id ORDER BY c.sort_key ASC" | |
| parameters: list[dict[str, object]] = [{"name": "@session_id", "value": session_key}] | |
| query = ( | |
| "SELECT c.message FROM c " | |
| "WHERE c.session_id = @session_id AND c.source_id = @source_id " | |
| "ORDER BY c.sort_key ASC" | |
| ) | |
| parameters: list[dict[str, object]] = [ | |
| {"name": "@session_id", "value": session_key}, | |
| {"name": "@source_id", "value": self.source_id}, | |
| ] |
| query = "SELECT c.id FROM c WHERE c.session_id = @session_id" | ||
| parameters: list[dict[str, object]] = [{"name": "@session_id", "value": session_key}] | ||
| items = container.query_items(query=query, parameters=parameters, partition_key=session_key) | ||
|
|
There was a problem hiding this comment.
clear() deletes all items for the session partition without filtering by source_id. If multiple provider instances share a container, calling clear() on one instance can delete another instance’s history. Consider including source_id in the delete query (and/or using a provider-specific partition key scheme).
| query = "SELECT DISTINCT VALUE c.session_id FROM c" | ||
| items = container.query_items(query=query, enable_cross_partition_query=True) |
There was a problem hiding this comment.
list_sessions() currently lists session IDs across the entire container. If the container can contain data for multiple providers/apps, consider filtering by source_id (or documenting that the container must be dedicated to a single provider instance) to avoid returning unrelated sessions.
| query = "SELECT DISTINCT VALUE c.session_id FROM c" | |
| items = container.query_items(query=query, enable_cross_partition_query=True) | |
| query = "SELECT DISTINCT VALUE c.session_id FROM c WHERE c.source_id = @source_id" | |
| parameters: list[dict[str, object]] = [{"name": "@source_id", "value": self.source_id}] | |
| items = container.query_items( | |
| query=query, | |
| parameters=parameters, | |
| enable_cross_partition_query=True, | |
| ) |
| credential = AzureCliCredential() | ||
| client = AzureOpenAIResponsesClient( | ||
| project_endpoint=project_endpoint, | ||
| deployment_name=deployment_name, | ||
| credential=credential, | ||
| ) | ||
|
|
||
| # 3. Create an agent that uses the history provider as a context provider. | ||
| async with ( | ||
| CosmosHistoryProvider( | ||
| endpoint=cosmos_endpoint, | ||
| database_name=cosmos_database_name, | ||
| container_name=cosmos_container_name, | ||
| credential=cosmos_key or credential, | ||
| ) as history_provider, | ||
| client.as_agent( | ||
| name="CosmosHistoryAgent", | ||
| instructions="You are a helpful assistant that remembers prior turns.", | ||
| context_providers=[history_provider], | ||
| default_options={"store": False}, | ||
| ) as agent, | ||
| ): | ||
| # 4. Create a session (session_id is used as the partition key). | ||
| session = agent.create_session() | ||
|
|
||
| # 5. Run a multi-turn conversation; history is persisted by CosmosHistoryProvider. | ||
| response1 = await agent.run("My name is Ada and I enjoy distributed systems.", session=session) | ||
| print(f"Assistant: {response1.text}") | ||
|
|
||
| response2 = await agent.run("What do you remember about me?", session=session) | ||
| print(f"Assistant: {response2.text}") | ||
| print(f"Container: {history_provider.container_name}") |
There was a problem hiding this comment.
AzureCliCredential() (aio) is created here but never closed. Please ensure the sample disposes it properly (e.g., async with AzureCliCredential() as credential: or await credential.close() in a finally) to avoid leaking network resources during repeated runs.
| credential = AzureCliCredential() | |
| client = AzureOpenAIResponsesClient( | |
| project_endpoint=project_endpoint, | |
| deployment_name=deployment_name, | |
| credential=credential, | |
| ) | |
| # 3. Create an agent that uses the history provider as a context provider. | |
| async with ( | |
| CosmosHistoryProvider( | |
| endpoint=cosmos_endpoint, | |
| database_name=cosmos_database_name, | |
| container_name=cosmos_container_name, | |
| credential=cosmos_key or credential, | |
| ) as history_provider, | |
| client.as_agent( | |
| name="CosmosHistoryAgent", | |
| instructions="You are a helpful assistant that remembers prior turns.", | |
| context_providers=[history_provider], | |
| default_options={"store": False}, | |
| ) as agent, | |
| ): | |
| # 4. Create a session (session_id is used as the partition key). | |
| session = agent.create_session() | |
| # 5. Run a multi-turn conversation; history is persisted by CosmosHistoryProvider. | |
| response1 = await agent.run("My name is Ada and I enjoy distributed systems.", session=session) | |
| print(f"Assistant: {response1.text}") | |
| response2 = await agent.run("What do you remember about me?", session=session) | |
| print(f"Assistant: {response2.text}") | |
| print(f"Container: {history_provider.container_name}") | |
| async with AzureCliCredential() as credential: | |
| client = AzureOpenAIResponsesClient( | |
| project_endpoint=project_endpoint, | |
| deployment_name=deployment_name, | |
| credential=credential, | |
| ) | |
| # 3. Create an agent that uses the history provider as a context provider. | |
| async with ( | |
| CosmosHistoryProvider( | |
| endpoint=cosmos_endpoint, | |
| database_name=cosmos_database_name, | |
| container_name=cosmos_container_name, | |
| credential=cosmos_key or credential, | |
| ) as history_provider, | |
| client.as_agent( | |
| name="CosmosHistoryAgent", | |
| instructions="You are a helpful assistant that remembers prior turns.", | |
| context_providers=[history_provider], | |
| default_options={"store": False}, | |
| ) as agent, | |
| ): | |
| # 4. Create a session (session_id is used as the partition key). | |
| session = agent.create_session() | |
| # 5. Run a multi-turn conversation; history is persisted by CosmosHistoryProvider. | |
| response1 = await agent.run("My name is Ada and I enjoy distributed systems.", session=session) | |
| print(f"Assistant: {response1.text}") | |
| response2 = await agent.run("What do you remember about me?", session=session) | |
| print(f"Assistant: {response2.text}") | |
| print(f"Container: {history_provider.container_name}") |
| """ | ||
| This sample demonstrates CosmosHistoryProvider as an agent context provider. | ||
|
|
||
| Key components: | ||
| - AzureOpenAIResponsesClient configured with an Azure AI project endpoint |
There was a problem hiding this comment.
For consistency with existing samples, consider moving this descriptive triple-quoted block to come after the load_dotenv() call (rather than before it). This matches the structure used throughout python/samples/* and keeps environment loading in a consistent place.
| # Load environment variables from .env file. | ||
| load_dotenv() |
There was a problem hiding this comment.
load_dotenv() is called after the descriptive triple-quoted block; in existing samples the .env load happens immediately after imports. Consider moving load_dotenv() up so env vars are loaded before any sample configuration text/instructions are presented, matching python/samples/SAMPLE_GUIDELINES.md.
| self._container: ContainerProxy | None = container_client | ||
| self._owns_client = False | ||
|
|
||
| if self._container is not None: |
There was a problem hiding this comment.
When container_client is provided, __init__ returns early, so database_name / container_name attributes are never set (and any provided database_name/container_name args are ignored). This can lead to AttributeError for callers that log/introspect these properties. Consider setting these attributes even in the injected-container path, or clearly documenting that they may be unset when container_client is used.
| if self._container is not None: | |
| if self._container is not None: | |
| # When a container_client is provided, we may still want database/container | |
| # names for logging or introspection purposes. Use any explicitly supplied | |
| # values; they may be None if not provided. | |
| self.database_name = database_name | |
| self.container_name = container_name |
moonbox3
left a comment
There was a problem hiding this comment.
Automated Code Review
Reviewers: 3 | Confidence: 80%
✗ Correctness
The diff adds a new Azure Cosmos DB history provider package with solid implementation and tests, applies formatting-only changes to several existing files, and removes six local
from agent_framework import Contentimports from test methods intest_aisearch_context_provider.py. The removal of these local imports is suspicious: there is no corresponding addition of a module-level import forContentanywhere in the diff, which would causeNameErrorat runtime in all six affected test methods. The rest of the changes (new Cosmos package, formatting, pytest marker additions) are correct.
✓ Security Reliability
New Azure Cosmos DB history provider is generally well-structured: parameterized queries prevent injection, credential handling uses SecretString properly, and resource ownership is tracked for cleanup. The main concerns are: (1) a
Nonesession_id silently maps to a shared "default" partition, risking unintended cross-session data leakage; (2) partial batch failures insave_messages/clearare unhandled, leaving data in an inconsistent state; and (3) the__aexit__can swallow the original exception ifclose()also raises. The remaining changes are formatting-only or test import hoisting and carry no risk.
✓ Test Coverage
The new CosmosHistoryProvider has solid test coverage for core CRUD operations (get, save, clear, list_sessions), initialization variants, lifecycle management (close, async context manager), and before/after run hooks. However, several non-trivial code paths lack tests: the batch-splitting logic when operations exceed _BATCH_OPERATION_LIMIT (100), the _session_partition_key fallback to 'default' when session_id is None, the _resolve_credential ValueError path when neither credential nor env key is provided, the RuntimeError in _get_container when database_client is None, and the filtering of non-dict message payloads in get_messages. The existing tests in test_aisearch_context_provider.py had only import-cleanup changes (moving Content import to module level) and formatting, which are fine.
Blocking Issues
- Six test methods in
test_aisearch_context_provider.pyremove the localfrom agent_framework import Contentimport but no module-level import forContentis added in this diff, which will causeNameError: name 'Content' is not definedwhen running these tests.
Suggestions
- Consider adding
from agent_framework import Contentas a module-level import at the top oftest_aisearch_context_provider.py(alongside the existing imports) to replace the removed local imports. - Consider raising an error or generating a unique key when
session_id is Nonein_session_partition_keyrather than silently falling back to a shared"default"partition, which can leak messages across unrelated callers. - Add error handling around
execute_item_batchinsave_messagesandclearso a partial batch failure doesn't silently leave data in an inconsistent state (e.g., log the error, raise, or retry the remaining batch). - Guard
close()inside__aexit__with a try/except so that a failure during cleanup doesn't mask the original exception that triggered the exit. - Add a test for batch splitting in save_messages and clear when the number of items exceeds _BATCH_OPERATION_LIMIT (100). This is a non-trivial loop that could silently lose data if broken.
- Add a test for _session_partition_key returning 'default' when session_id is None — this is a meaningful behavioral contract that callers may depend on.
- Add a test for _resolve_credential raising ValueError when both credential and settings key are None (the third branch in that method).
- Add a test for _get_container raising RuntimeError when _database_client is None, to ensure the guard clause works correctly.
- Add a test for get_messages skipping non-dict message payloads (e.g., a malformed item with a string or None 'message' field), since the code explicitly checks isinstance(message_payload, dict).
- The after_run test (test_after_run_stores_input_and_response) only asserts on len(batch_operations) == 2 but doesn't verify the content of the stored messages (roles, text). Consider adding assertions on the actual message payloads to match the thoroughness of test_saves_messages.
Automated review by moonbox3's agents
|
|
||
| def test_image_uri_content(self) -> None: | ||
| from agent_framework import Content | ||
|
|
There was a problem hiding this comment.
This line uses Content.from_uri(...) but the local from agent_framework import Content import was removed in this hunk and no module-level import for Content is visible in the diff. If Content is not already imported at the top of this file, all six affected test methods will fail with NameError. Please verify a module-level import exists, or add one.
| async def get_messages(self, session_id: str | None, **kwargs: Any) -> list[Message]: | ||
| """Retrieve stored messages for this session from Azure Cosmos DB.""" | ||
| session_key = self._session_partition_key(session_id) | ||
| container = await self._get_container() |
There was a problem hiding this comment.
_session_partition_key returns a hard-coded "default" when session_id is None. All callers that omit a session ID will silently share one partition and see each other's messages. Consider raising a ValueError or generating a random key instead to prevent accidental cross-session data leakage.
| container = await self._get_container() | |
| @staticmethod | |
| def _session_partition_key(session_id: str | None) -> str: | |
| if session_id is None: | |
| raise ValueError("session_id is required for CosmosHistoryProvider operations.") | |
| return session_id |
|
|
||
| async def __aexit__( | ||
| self, | ||
| exc_type: type[BaseException] | None, |
There was a problem hiding this comment.
__aexit__ unconditionally awaits close(). If close() raises (e.g., network error closing the Cosmos client) while an exception is already propagating, the original exception is lost. Wrap in try/except to preserve the original error.
| exc_type: type[BaseException] | None, | |
| async def __aexit__( | |
| self, | |
| exc_type: type[BaseException] | None, | |
| exc_val: BaseException | None, | |
| exc_tb: Any, | |
| ) -> None: | |
| """Async context manager exit.""" | |
| try: | |
| await self.close() | |
| except Exception: | |
| if exc_type is None: | |
| raise |
| class TestCosmosHistoryProviderSaveMessages: | ||
| async def test_saves_messages(self, mock_container: MagicMock) -> None: | ||
| provider = CosmosHistoryProvider(source_id="mem", container_client=mock_container) | ||
| messages = [Message(role="user", contents=["Hello"]), Message(role="assistant", contents=["Hi"])] |
There was a problem hiding this comment.
Consider adding a test that calls save_messages with more than _BATCH_OPERATION_LIMIT (100) messages to verify the batching loop issues multiple execute_item_batch calls. This is a data-integrity-critical code path.
| assert messages[0].text == "Hello" | ||
| assert messages[1].role == "assistant" | ||
| assert messages[1].text == "Hi" | ||
|
|
There was a problem hiding this comment.
No test covers get_messages(None) which exercises the _session_partition_key fallback to "default". This is an important behavioral contract worth explicitly verifying.
| async def test_empty_returns_empty(self, mock_container: MagicMock) -> None: | |
| mock_container.query_items.return_value = _to_async_iter([]) | |
| provider = CosmosHistoryProvider(source_id="mem", container_client=mock_container) | |
| messages = await provider.get_messages("s1") | |
| assert messages == [] | |
| async def test_none_session_id_uses_default_partition(self, mock_container: MagicMock) -> None: | |
| mock_container.query_items.return_value = _to_async_iter([]) | |
| provider = CosmosHistoryProvider(source_id="mem", container_client=mock_container) | |
| await provider.get_messages(None) | |
| call_kwargs = mock_container.query_items.call_args.kwargs | |
| assert call_kwargs["partition_key"] == "default" |
|
|
||
| raise ValueError( | ||
| "Azure Cosmos credential is required. Provide 'credential' or set 'AZURE_COSMOS_KEY' environment variable." | ||
| ) |
There was a problem hiding this comment.
The _resolve_credential ValueError branch (line 147-149) when neither credential nor settings key is provided has no corresponding test. Add a test that passes credential=None with no AZURE_COSMOS_KEY env var set to verify this error path.
| async def test_after_run_stores_input_and_response(self, mock_container: MagicMock) -> None: | ||
| provider = CosmosHistoryProvider(source_id="mem", container_client=mock_container) | ||
| session = AgentSession(session_id="test") | ||
| context = SessionContext(input_messages=[Message(role="user", contents=["hi"])], session_id="s1") |
There was a problem hiding this comment.
This test asserts len(batch_operations) == 2 but does not verify the actual message content. Consider also asserting on the roles and text of the stored messages to match the thoroughness of test_saves_messages.
| context = SessionContext(input_messages=[Message(role="user", contents=["hi"])], session_id="s1") | |
| mock_container.execute_item_batch.assert_awaited_once() | |
| batch_operations = mock_container.execute_item_batch.await_args.kwargs["batch_operations"] | |
| assert len(batch_operations) == 2 | |
| input_doc = batch_operations[0][1][0] | |
| assert input_doc["message"]["role"] == "user" | |
| response_doc = batch_operations[1][1][0] | |
| assert response_doc["message"]["role"] == "assistant" |
Summary
agent-framework-azure-cosmosCosmosHistoryProviderfor conversation history in Azure Cosmos DBexecute_item_batchand addlist_sessionsValidation
uv run --directory packages/azure-cosmos poe testuv run --directory packages/azure-cosmos poe lintuv run --directory packages/azure-cosmos poe pyrightuv run --directory packages/azure-cosmos poe mypyFixes #1390